JDK源码分析——ConCurrentHashMap类

ConcurrentHashMap是conccurrent家族中的一个类,由于它可以高效地支持并发操作,以及被广泛使用,经典的开源框架Spring的底层数据结构就是使用ConcurrentHashMap实现的。与同是线程安全的老大哥HashTable相比,它已经更胜一筹,因此它的锁更加细化,而不是像HashTable一样为几乎每个方法都添加了synchronized锁,这样的锁无疑会影响到性能。

本文的分析的源码是JDK8的版本,与JDK6的版本有很大的差异。实现线程安全的思想也已经完全变了,它摒弃了Segment(锁段)的概念,而是启用了一种全新的方式实现,利用CAS算法。它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想,但是为了做到并发,又增加了很多辅助的类,例如TreeBin,Traverser等对象内部类。

常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;

// 默认初始化容量
private static final int DEFAULT_CAPACITY = 16;

// 数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 默认并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 负载因子
private static final float LOAD_FACTOR = 0.75f;

// 链表转红黑树阀值,大于8,链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;

// 树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;

// 最小树化容量
static final int MIN_TREEIFY_CAPACITY = 64;

/**
* Minimum number of rebinnings per transfer step. Ranges are
* subdivided to allow multiple resizer threads. This value
* serves as a lower bound to avoid resizers encountering
* excessive memory contention. The value should be at least
* DEFAULT_CAPACITY.
*/
private static final int MIN_TRANSFER_STRIDE = 16;

/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;

// 2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

// 32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

// 可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;

/**
* The next table to use; non-null only while resizing.
*/
private transient volatile Node<K,V>[] nextTable;

/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
private transient volatile long baseCount;

/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;

/**
* The next table index (plus one) to split while resizing.
*/
private transient volatile int transferIndex;

/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;

/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;

// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;
  • table:默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方;
  • nextTable:默认为null,扩容时新生成的数组,其大小为原数组的两倍;
  • baseCount:基本计数;
  • sizeCtl:控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义:
    当为负数时:-1代表正在初始化,-N代表有N-1个线程正在进行扩容;
    当为0时(默认值):代表当时的table还没有被初始化;
    当为正数时:表示初始化或者下一次进行扩容的大小,这一点类似于扩容阈值的概念。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。
  • transferIndex:扩容下另一个表的索引;
  • cellsBusy:旋转锁;
  • counterCells:counterCell表。

重要内部类

Node

Node是最核心的内部类,它包装了key-value键值对,所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很相似,但是但是有一些差别它对value和next属性设置了volatile同步锁(与JDK7的Segment相同),它不允许调用setValue方法直接改变Node的value域,它增加了find方法辅助map.get()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Key-value entry. This class is never exported out as a
* user-mutable Map.Entry (i.e., one supporting setValue; see
* MapEntry below), but can be used for read-only traversals used
* in bulk tasks. Subclasses of Node with a negative hash field
* are special, and contain null keys and values (but are never
* exported). Otherwise, keys and vals are never null.
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash; // 节点的哈希值
final K key; // 节点的键
// val和next都用volatile修饰,保证并发的可见性
volatile V val; // 节点的值
volatile Node<K,V> next; // 节点的后继节点

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) { // 不允许直接改变value的值
throw new UnsupportedOperationException();
}

public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}

/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

TreeNode

树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Nodes for use in TreeBins
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}

Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}

/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
// 查找hash为h,key为k的节点
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}

TreeBin

这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。

该类封装了一系列的方法,包括putTreeVal、lookRoot、UNlookRoot、remove、balanceInsetion、balanceDeletion。由于TreeBin的代码太长我们这里只展示构造方法(构造方法就是构造红黑树的过程):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Creates bin with initial set of nodes headed by b.
*/
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}

ForwardingNode

一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* A node inserted at head of bins during transfer operations.
*/
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}

Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* Creates a new, empty map with the default initial table size (16).
*/
public ConcurrentHashMap() {
}

/**
* Creates a new, empty map with an initial table size
* accommodating the specified number of elements without the need
* to dynamically resize.
*
* @param initialCapacity The implementation performs internal
* sizing to accommodate this many elements.
* @throws IllegalArgumentException if the initial capacity of
* elements is negative
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

/**
* Creates a new map with the same mappings as the given map.
*
* @param m the map
*/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

/**
* Creates a new, empty map with an initial table size based on
* the given number of elements ({@code initialCapacity}) and
* initial table density ({@code loadFactor}).
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements,
* given the specified load factor.
* @param loadFactor the load factor (table density) for
* establishing the initial table size
* @throws IllegalArgumentException if the initial capacity of
* elements is negative or the load factor is nonpositive
*
* @since 1.6
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

/**
* Creates a new, empty map with an initial table size based on
* the given number of elements ({@code initialCapacity}), table
* density ({@code loadFactor}), and number of concurrently
* updating threads ({@code concurrencyLevel}).
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements,
* given the specified load factor.
* @param loadFactor the load factor (table density) for
* establishing the initial table size
* @param concurrencyLevel the estimated number of concurrently
* updating threads. The implementation may use this value as
* a sizing hint.
* @throws IllegalArgumentException if the initial capacity is
* negative or the load factor or concurrencyLevel are
* nonpositive
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
  • ConcurrentHashMap():该构造函数用于创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射;
  • ConcurrentHashMap(int):该构造函数用于创建一个带有指定初始容量、默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射;
  • ConcurrentHashMap(Map<? extends K, ? extends V>):该构造函数用于构造一个与给定映射具有相同映射关系的新映射。
  • ConcurrentHashMap(int, float):该构造函数用于创建一个带有指定初始容量、加载因子和默认 concurrencyLevel (1) 的新的空映射;
  • ConcurrentHashMap(int, float, int):该构造函数用于创建一个带有指定初始容量、加载因子和并发级别的新的空映射。

对于构造函数而言,会根据输入的initialCapacity的大小来确定一个最小的且大于等于initialCapacity大小的2的n次幂,如initialCapacity为15,则sizeCtl为16,若initialCapacity为16,则sizeCtl为16。若initialCapacity大小超过了允许的最大值,则sizeCtl为最大值。值得注意的是,构造函数中的concurrencyLevel参数已经在JDK1.8中的意义发生了很大的变化,其并不代表所允许的并发数,其只是用来确定sizeCtl大小,在JDK1.8中的并发控制都是针对具体的桶而言,即有多少个桶就可以允许多少个并发数。

核心方法分析

实例初始化:tableSizeFor(int c)

实例化ConcurrentHashMap时带参数时,会根据参数调整table的大小,假设参数为100,最终会调整成256,确保table的大小总是2的幂次方,算法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Returns a power of two table size for the given desired capacity.
* See Hackers Delight, sec 3.2
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

注意: ConcurrentHashMap在构造函数中只会初始化sizeCtl值,并不会直接初始化table,而是延缓到第一次put操作。

table初始化:initTable()

ConcurrentHashMap的初始化主要由initTable()方法实现,在上面的构造函数中我们可以看到,其实ConcurrentHashMap在构造函数中并没有做什么事,仅仅只是设置了一些参数而已。其真正的初始化是发生在插入的时候,例如put、merge、compute、computeIfAbsent、computeIfPresent操作时。其方法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// sizeCtl < 0 表示有其他线程在初始化,该线程必须挂起
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 如果该线程获取了初始化的权利,则用CAS将sizeCtl设置为-1,表示本线程正在初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try { // 进行初始化
if ((tab = table) == null || tab.length == 0) { // table表为空或者大小为0
// sc的值是否大于0,若是,则n为sc,否则,n为默认初始容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 新生结点数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt; // 赋值给table
// 下次扩容的大小
sc = n - (n >>> 2); // 相当于0.75*n 设置一个扩容的阈值
}
} finally {
sizeCtl = sc; // 设置sizeCtl的值
}
break;
}
}
return tab;
}

说明:对于table的大小,会根据sizeCtl的值进行设置,如果没有设置szieCtl的值,那么默认生成的table大小为16,否则,会根据sizeCtl的大小设置table大小。

初始化方法initTable()的关键就在于sizeCtl,该值默认为0,如果在构造函数时有参数传入该值则为2的幂次方。该值如果 < 0,表示有其他线程正在初始化,则必须暂停该线程。如果线程获得了初始化的权限则先将sizeCtl设置为-1,防止有其他线程进入,最后将sizeCtl设置0.75 * n,表示扩容的阈值。

put()

ConcurrentHashMap最常用的put、get操作,ConcurrentHashMap的put操作与HashMap并没有多大区别,其核心思想依然是根据hash值计算节点插入在table的位置,如果该位置为空,则直接插入,否则插入到链表或者树中。但是ConcurrentHashMap会涉及到多线程情况就会复杂很多。我们先看源代码,然后根据源代码一步一步分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key、value均不能为null
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // 经过计算获得hash值
int binCount = 0;
for (Node<K,V>[] tab = table;;) { // 无限循环
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) // 表为空或者表的长度为
tab = initTable(); // 初始化表
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 表不为空并且表的长度大于0,并且该桶不为空
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null))) // 比较并且交换值,如tab的第i项为空则用新生成的node替换
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // 该结点的hash值为MOVED
tab = helpTransfer(tab, f); // 进行结点的转移(在扩容的过程中)
else {
V oldVal = null;
synchronized (f) { // 加锁同步
if (tabAt(tab, i) == f) { // 找到table表下标为i的节点
if (fh >= 0) { // 该table表中该结点的hash值大于0
binCount = 1; // binCount赋值为1
for (Node<K,V> e = f;; ++binCount) { // 无限循环
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 结点的hash值相等并且key也相等
oldVal = e.val; // 保存该结点的val值
if (!onlyIfAbsent) // 进行判断
e.val = value; // 将指定的value保存至结点,即进行了结点值的更新
break;
}
Node<K,V> pred = e; // 保存当前结点
if ((e = e.next) == null) { // 当前结点的下一个结点为空,即为最后一个结点
// 新生一个结点并且赋值给next域
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 结点为红黑树结点类型
Node<K,V> p;
binCount = 2; // binCount赋值为2
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) { // 将hash、key、value放入红黑树
oldVal = p.val; // 保存结点的val
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) { // binCount不为0
if (binCount >= TREEIFY_THRESHOLD) // 如果binCount大于等于转化为红黑树的阈值
treeifyBin(tab, i); // 进行转化
if (oldVal != null) // 旧值不为空
return oldVal; // 返回旧值
break;
}
}
}
addCount(1L, binCount); // 增加binCount的数量
return null;
}
putVal(K key, V value, boolean onlyIfAbsent)方法干的工作如下:
1、检查key/value是否为空,如果为空,则抛异常,否则进行2
2、进入for死循环,进行3
3、检查table是否初始化了,如果没有,则调用initTable()进行初始化然后进行 2,否则进行4
4、根据key的hash值计算出其应该在table中储存的位置i,取出table[i]的节点用f表示。
    根据f的不同有如下三种情况:1)如果table[i]==null(即该位置的节点为空,没有发生碰撞),
                                则利用CAS操作直接存储在该位置,如果CAS操作成功则退出死循环。
                                2)如果table[i]!=null(即该位置已经有其它节点,发生碰撞),碰撞处理也有两种情况
                                    2.1)检查table[i]的节点的hash是否等于MOVED,如果等于,则检测到正在扩容,则帮助其扩容
                                    2.2)说明table[i]的节点的hash值不等于MOVED,如果table[i]为链表节点,则将此节点插入链表中即可
                                        如果table[i]为树节点,则将此节点插入树中即可。插入成功后,进行 5
5、如果table[i]的节点是链表节点,则检查table的第i个位置的链表是否需要转化为数,如果需要则调用treeifyBin函数进行转化

说明:put函数底层调用了putVal进行数据的插入,对于putVal函数的流程大体如下。

① 判断存储的key、value是否为空,若为空,则抛出异常,否则,进入步骤②

② 计算key的hash值,随后进入无限循环,该无限循环可以确保成功插入数据,若table表为空或者长度为0,则初始化table表,否则,进入步骤③

③ 根据key的hash值取出table表中的结点元素,若取出的结点为空(该桶为空),则使用CAS将key、value、hash值生成的结点放入桶中。否则,进入步骤④

④ 若该结点的的hash值为MOVED,则对该桶中的结点进行转移,否则,进入步骤⑤

⑤ 对桶中的第一个结点(即table表中的结点)进行加锁,对该桶进行遍历,桶中的结点的hash值与key值与给定的hash值和key值相等,则根据标识选择是否进行更新操作(用给定的value值替换该结点的value值),若遍历完桶仍没有找到hash值与key值和指定的hash值与key值相等的结点,则直接新生一个结点并赋值为之前最后一个结点的下一个结点。进入步骤⑥

⑥ 若binCount值达到红黑树转化的阈值,则将桶中的结构转化为红黑树存储,最后,增加binCount的值。

在putVal函数中会涉及到如下几个函数:initTable、tabAt、casTabAt、helpTransfer、putTreeVal、treeifyBin、addCount函数。下面对其中涉及到的函数进行分析。

tabAt()

1
2
3
4
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

说明:此函数返回table数组中下标为i的结点,可以看到是通过Unsafe对象通过反射获取的,getObjectVolatile的第二项参数为下标为i的偏移地址。

casTabAt()

1
2
3
4
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

说明:此函数用于比较table数组下标为i的结点是否为c,若为c,则用v交换操作。否则,不进行交换操作。

helpTransfer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Helps transfer if a resize is in progress.
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// table表不为空并且结点类型使ForwardingNode类型,并且结点的nextTable不为空
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) { // 条件判断
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 比较并交换
// 将table的结点转移到nextTab中
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

  
说明:此函数用于在扩容时将table表中的结点转移到nextTable中。

putTreeVal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* Finds or adds a node.
* @return null if added
*/
final TreeNode<K,V> putTreeVal(int h, K k, V v) {
Class<?> kc = null;
boolean searched = false;
for (TreeNode<K,V> p = root;;) {
int dir, ph; K pk;
if (p == null) {
first = root = new TreeNode<K,V>(h, k, v, null, null);
break;
}
else if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0) {
if (!searched) {
TreeNode<K,V> q, ch;
searched = true;
if (((ch = p.left) != null &&
(q = ch.findTreeNode(h, k, kc)) != null) ||
((ch = p.right) != null &&
(q = ch.findTreeNode(h, k, kc)) != null))
return q;
}
dir = tieBreakOrder(k, pk);
}

TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
TreeNode<K,V> x, f = first;
first = x = new TreeNode<K,V>(h, k, v, f, xp);
if (f != null)
f.prev = x;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
if (!xp.red)
x.red = true;
else {
lockRoot();
try {
root = balanceInsertion(root, x);
} finally {
unlockRoot();
}
}
break;
}
}
assert checkInvariants(root);
return null;
}

说明:此函数用于将指定的hash、key、value值添加到红黑树中,若已经添加了,则返回null,否则返回该结点。

treeifyBin()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Replaces all linked nodes in bin at given index unless table is
* too small, in which case resizes instead.
*/
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) { // 表不为空
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // table表的长度小于最小的长度
// 进行扩容,调整某个桶中结点数量过多的问题(由于某个桶中结点数量超出了阈值,则触发treeifyBin)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 桶中存在结点并且结点的hash值大于等于0
synchronized (b) { // 对桶中第一个结点进行加锁
if (tabAt(tab, index) == b) { // 第一个结点没有变化
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) { // 遍历桶中所有结点
// 新生一个TreeNode结点
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null) // 该结点前驱为空
// 设置p为头结点
hd = p;
else // 尾节点的next域赋值为p
tl.next = p;
tl = p; // 尾节点赋值为p
}
// 设置table表中下标为index的值为hd
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

说明:此函数用于将桶中的数据结构转化为红黑树,其中,值得注意的是,当table的长度未达到阈值时,会进行一次扩容操作,该操作会使得触发treeifyBin操作的某个桶中的所有元素进行一次重新分配,这样可以避免某个桶中的结点数量太大。

addCount()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
*
* @param x the count to add
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// counterCells不为空或者比较交换失败
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true; // 无竞争标识
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

说明:此函数主要完成binCount的值加1的操作。

get()

ConcurrentHashMap的get操作还是挺简单的,无非就是通过hash来找key相同的节点而已,当然需要区分链表和树形两种情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 计算key的hash值
// 表不为空并且表的长度大于0并且key所在的桶不为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) { // 表中的元素的hash值与key的hash值相等
if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 键相等
return e.val; // 返回值
}
else if (eh < 0) // 结点hash值小于0,树
// 在桶(链表/红黑树)中查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 对于结点hash值大于0的情况,链表,遍历
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

get操作的整个逻辑非常清楚:

  • 计算hash值
  • 判断table是否为空,如果为空,直接返回null
  • 根据hash值获取table中的Node节点(tabAt(tab, (n - 1) & h)),然后根据链表或者树形方式找到相对应的节点,返回其value值。

参考博客